package kafka

import (
	"errors"
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/rcrowley/go-metrics"
	"time"
)

type Producer struct {
	client sarama.Client
	topic  string
}

// ProducerSetting
// 一些未设置的参数： 读写连接时间默认都是30s，默认消息不会被压缩，默认使用hash key选择分区，
type ProducerSetting struct {
	Hosts []string `mapstructure:"hosts"`
	Topic string   `mapstructure:"topic"`
	// ReturnSuccess 同步生产时，ReturnSuccess必须指定为true。异步生产时，指定为true后，必须处理success，否则阻塞
	ReturnSuccess bool `mapstructure:"return_success"`
	// ReturnError 同步生产时，ReturnError默认为true。异步生产时，指定为true后，必须处理error，否则阻塞
	ReturnError bool `mapstructure:"return_error"`
	// 默认的最大发送消息大小是1MB=defaults to 1000000字节
	MaxMessageBytes int `mapstructure:"max_message_bytes"`
	// ack确认机制，默认是1，即WaitForLocal
	RequiredAcks int `mapstructure:"required_acks"`
	// socket的超时时间，ms毫秒
	Timeout time.Duration `mapstructure:"timeout"`
	// 最大重试次数
	MaxRetry int `mapstructure:"max_retry"`
}

func NewKafkaProducer(producerSetting ProducerSetting) (*Producer, error) {
	config := checkProducerConfig(producerSetting)
	client, err := sarama.NewClient(producerSetting.Hosts, config)
	if err != nil {
		return nil, err
	}
	return &Producer{client: client, topic: producerSetting.Topic}, nil
}

func checkProducerConfig(producerSetting ProducerSetting) *sarama.Config {
	if len(producerSetting.Hosts) == 0 {
		var err interface{}
		err = "setting err, no hosts"
		panic(err)
	}
	config := sarama.NewConfig()
	if producerSetting.MaxRetry != 0 {
		config.Producer.Retry.Max = producerSetting.MaxRetry
	}

	switch producerSetting.RequiredAcks {
	case -1:
		config.Producer.RequiredAcks = sarama.WaitForAll
	case 0:
		config.Producer.RequiredAcks = sarama.NoResponse
	case 1:
		config.Producer.RequiredAcks = sarama.WaitForLocal
	}

	config.Producer.Return.Successes = producerSetting.ReturnSuccess
	config.Producer.Return.Errors = producerSetting.ReturnError

	if producerSetting.Timeout > 0 {
		config.Producer.Timeout = producerSetting.Timeout
	}
	// 关闭metrics,会引起内存泄漏
	metrics.UseNilMetrics = true // disable Sarama's go-metrics library
	config.MetricRegistry = metrics.NewRegistry()
	return config
}

// SendMsgSync 同步生产
func (kafkaProducer *Producer) SendMsgSync(topic string, key string, value string) (partition int32, offset int64, err error) {
	if len(topic) == 0 && len(kafkaProducer.topic) > 0 {
		topic = kafkaProducer.topic
	}
	syncProducer, err := sarama.NewSyncProducerFromClient(kafkaProducer.client)
	if err != nil {
		return int32(-1), int64(-1), errors.New(fmt.Sprintf(" create sync producer error:%s", err.Error()))
	}
	defer syncProducer.Close()
	msg := sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(value)}
	partition, offset, err = syncProducer.SendMessage(&msg)
	if err != nil {
		return int32(-1), int64(-1), errors.New(fmt.Sprintf("send message error:%s", err.Error()))
	}
	return partition, offset, nil
}

// SendMsgASync 异步生产
func (kafkaProducer *Producer) SendMsgASync(topic string, key string, value string) error {
	if len(topic) == 0 && len(kafkaProducer.topic) > 0 {
		topic = kafkaProducer.topic
	}
	asyncProducer, err := sarama.NewAsyncProducerFromClient(kafkaProducer.client)
	if err != nil {
		return errors.New(fmt.Sprintf(" create aSync producer error:%s", err.Error()))
	}
	defer asyncProducer.Close()
	msg := sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(value)}
	asyncProducer.Input() <- &msg

	// TODO 这样就变成同步生产了
	select {
	case <-asyncProducer.Successes():
		// todo err
		return nil
	case fail := <-asyncProducer.Errors():
		// todo err
		return fail
	}
}

// Close 异步生产时，程序运行结束后必须调用Close方法，否则缓冲区的未发送的数据有可能被清除
func (kafkaProducer *Producer) Close() {
	kafkaProducer.client.Close()
}
